/*

 * Licensed to the Apache Software Foundation (ASF) under one

 * or more contributor license agreements.  See the NOTICE file

 * distributed with this work for additional information

 * regarding copyright ownership.  The ASF licenses this file

 * to you under the Apache License, Version 2.0 (the

 * "License"); you may not use this file except in compliance

 * with the License.  You may obtain a copy of the License at

 *

 *     http://www.apache.org/licenses/LICENSE-2.0

 *

 * Unless required by applicable law or agreed to in writing, software

 * distributed under the License is distributed on an "AS IS" BASIS,

 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

 * See the License for the specific language governing permissions and

 * limitations under the License.

 */

package com.bff.gaia.unified.sdk.extensions.sql.meta.provider.text;



import com.alibaba.fastjson.JSONObject;

import com.google.auto.service.AutoService;

import com.bff.gaia.unified.sdk.extensions.sql.UnifiedSqlTable;

import com.bff.gaia.unified.sdk.extensions.sql.meta.Table;

import com.bff.gaia.unified.sdk.extensions.sql.meta.provider.InMemoryMetaTableProvider;

import com.bff.gaia.unified.sdk.extensions.sql.meta.provider.TableProvider;

import com.bff.gaia.unified.sdk.schemas.Schema;

import com.bff.gaia.unified.sdk.transforms.FlatMapElements;

import com.bff.gaia.unified.sdk.transforms.MapElements;

import com.bff.gaia.unified.sdk.transforms.PTransform;

import com.bff.gaia.unified.sdk.values.PCollection;

import com.bff.gaia.unified.sdk.values.Row;

import com.bff.gaia.unified.sdk.values.TypeDescriptors;

import com.bff.gaia.unified.vendor.guava.com.google.common.annotations.VisibleForTesting;

import com.bff.gaia.unified.vendor.guava.com.google.common.base.MoreObjects;

import com.bff.gaia.unified.vendor.guava.com.google.common.collect.ImmutableSet;

import org.apache.commons.csv.CSVFormat;



import javax.annotation.Nullable;

import java.io.Serializable;



import static com.bff.gaia.unified.sdk.extensions.sql.impl.schema.UnifiedTableUtils.unifiedRow2CsvLine;

import static com.bff.gaia.unified.sdk.extensions.sql.impl.schema.UnifiedTableUtils.csvLines2UnifiedRows;

import static com.bff.gaia.unified.vendor.guava.com.google.common.base.Preconditions.checkArgument;



/**

 * Text table provider.

 *

 * <p>A sample of text table is:

 *

 * <pre>{@code

 * CREATE TABLE ORDERS(

 *   ID INT COMMENT 'this is the primary key',

 *   NAME VARCHAR(127) COMMENT 'this is the name'

 * )

 * TYPE 'text'

 * COMMENT 'this is the table orders'

 * LOCATION '/home/admin/orders'

 * TBLPROPERTIES '{"format":"csv", "csvformat": "Excel"}' -- format of each text line(csv format)

 * }</pre>

 */

@AutoService(TableProvider.class)

public class TextTableProvider extends InMemoryMetaTableProvider {



  @Override

  public String getTableType() {

    return "text";

  }



  @Override

  public UnifiedSqlTable buildUnifiedSqlTable(Table table) {

    Schema schema = table.getSchema();



    String filePattern = table.getLocation();

    JSONObject properties = table.getProperties();

    String format = MoreObjects.firstNonNull(properties.getString("format"), "csv");



    // Backwards compatibility: previously "type": "text" meant CSV and "format" was where the

    // CSV format went. So assume that any other format is the CSV format.

    @Nullable String legacyCsvFormat = null;

    if (!ImmutableSet.of("csv", "lines").contains(format)) {

      legacyCsvFormat = format;

      format = "csv";

    }



    switch (format) {

      case "csv":

        String specifiedCsvFormat = properties.getString("csvformat");

        CSVFormat csvFormat =

            specifiedCsvFormat != null

                ? CSVFormat.valueOf(specifiedCsvFormat)

                : (legacyCsvFormat != null

                    ? CSVFormat.valueOf(legacyCsvFormat)

                    : CSVFormat.DEFAULT);

        return new TextTable(

            schema, filePattern, new CsvToRow(schema, csvFormat), new RowToCsv(csvFormat));

      case "lines":

        checkArgument(

            schema.getFieldCount() == 1

                && schema.getField(0).getType().getTypeName().equals(Schema.TypeName.STRING),

            "Table with type 'text' and format 'lines' "

                + "must have exactly one STRING/VARCHAR/CHAR column ");

        return new TextTable(

            schema, filePattern, new LinesReadConverter(), new LinesWriteConverter());

      default:

        throw new IllegalArgumentException(

            "Table with type 'text' must have format 'csv' or 'lines'");

    }

  }



  /** Write-side converter for for {@link TextTable} with format {@code 'lines'}. */

  public static class LinesWriteConverter extends PTransform<PCollection<Row>, PCollection<String>>

      implements Serializable {

    private static final Schema SCHEMA = Schema.builder().addStringField("line").build();



    public LinesWriteConverter() {}



    @Override

    public PCollection<String> expand(PCollection<Row> input) {

      return input.apply(

          "rowsToLines",

          MapElements.into(TypeDescriptors.strings()).via((Row row) -> row.getString(0) + "\n"));

    }

  }



  /** Read-side converter for {@link TextTable} with format {@code 'lines'}. */

  public static class LinesReadConverter extends PTransform<PCollection<String>, PCollection<Row>>

      implements Serializable {



    private static final Schema SCHEMA = Schema.builder().addStringField("line").build();



    public LinesReadConverter() {}



    @Override

    public PCollection<Row> expand(PCollection<String> input) {

      return input

          .apply(

              "linesToRows",

              MapElements.into(TypeDescriptors.rows())

                  .via(s -> Row.withSchema(SCHEMA).addValue(s).build()))

          .setRowSchema(SCHEMA);

    }

  }



  /** Write-side converter for {@link TextTable} with format {@code 'csv'}. */

  @VisibleForTesting

  static class RowToCsv extends PTransform<PCollection<Row>, PCollection<String>>

      implements Serializable {



    private CSVFormat csvFormat;



    public RowToCsv(CSVFormat csvFormat) {

      this.csvFormat = csvFormat;

    }



    @VisibleForTesting

    public CSVFormat getCsvFormat() {

      return csvFormat;

    }



    @Override

    public PCollection<String> expand(PCollection<Row> input) {

      return input.apply(

          "rowToCsv",

          MapElements.into(TypeDescriptors.strings()).via(row -> unifiedRow2CsvLine(row, csvFormat)));

    }

  }



  /** Read-side converter for {@link TextTable} with format {@code 'csv'}. */

  @VisibleForTesting

  public static class CsvToRow extends PTransform<PCollection<String>, PCollection<Row>>

      implements Serializable {



    private Schema schema;

    private CSVFormat csvFormat;



    @VisibleForTesting

    public CSVFormat getCsvFormat() {

      return csvFormat;

    }



    public CsvToRow(Schema schema, CSVFormat csvFormat) {

      this.schema = schema;

      this.csvFormat = csvFormat;

    }



    @Override

    public PCollection<Row> expand(PCollection<String> input) {

      return input

          .apply(

              "csvToRow",

              FlatMapElements.into(TypeDescriptors.rows())

                  .via(s -> csvLines2UnifiedRows(csvFormat, s, schema)))

          .setRowSchema(schema);

    }

  }

}